电商数仓 V2.0 (01 用户行为采集平台)

image-20210115095854491

一、数据仓库概念

二、项目需求及架构设计

三、数据生成模块

执行

1
2
3
java -classpath logcollecter-1.0-SNAPSHOT-jar-with-dependencies.jar com.shuofxz.appclient.AppMain > /dev/null 2>&1
# 或者(需要打包后的主类是 AppMain)
java -jar logcollecter-1.0-SNAPSHOT-jar-with-dependencies.jar > /dev/null 2>&1

更改系统时间,用于生成不同日期的日志

(这个应该放到 java 代码中改吧)

1
2
# 更改系统时间
sudo date -s 2021-01-11

四、数据采集模块

Hadoop

  • 配置支持 lzo 压缩(先别搞,需要 lzo 编译的 hadoop 才能用)

需要先安装 lzo 库

1
yum install lzo-devel lzop

将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

修改 etc/hadaoop/core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

将 jar 包 和 core-site.xml 同步到其他机器上

  • 压力测试

Zookeeper

flume 采集模块

  • 配置 conf/flume-env.sh
1
2
3
4
# 添加
export JAVA_HOME=/opt/module/jdk1.8.0_144
# HADOOP_HOME 必须写,否则启动时会报错,找不到 lib
export HADOOP_HOME=/opt/module/hadoop-2.7.2
  • flume 中添加一个配置文件,用于指定 interceptor, channel 等

conf/file-flume-kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.shuofxz.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.shuofxz.flume.interceptor.LogTypeInterceptor$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
  • 写一个 java,编写 ETL 和 类型拦截器
  • 修改 conf/log4j.properties
1
2
3
# 写相对路径貌似有问题,这个路径的logs 文件夹貌似也要提前创建
# 疑问:这个为啥要改呢???
flume.log.dir=/opt/module/flume-1.7.0-bin/logs
  • flume 群起群停脚本

flume-f1.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/bin/bash

case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume-1.7.0-bin/bin/flume-ng agent --conf /opt/module/flume-1.7.0-bin/conf --conf-file /opt/module/flume-1.7.0-bin/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.7.0-bin/flume.log 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
done

};;
esac

注意:

1
2
# 这个参数要加,否则 flume 运行不正常
-Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.7.0-bin/flume.log 2>&1

注意:

flume-env.sh 中需要配置 flume_opts 到 hadoop common 目录,否则启动时报错

Kafka 日志收集

  • flume 启动后就会往 kafka 里写数据,topic 也会自己创建
  • 测试:执行之前创建的生成日志脚本 gen_logs.sh,如果可以在kafka中接收到新的数据,证明从日志生成 -> flume -> kafka 链路打通了
  • 压力测试
1
2
3
4
5
# Producer
bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# Consumer
bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
  • 查看列表
1
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
  • 消费消息
1
2
3
4
5
bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_start

bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --topic topic_start

Kafka 数据存储到 HDFS

  • 配置文件

conf/kafka-flume-hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
# 重新读取数据需要添加,用于重新消费 kafka 的数据
#a1.sources.r1.groupId=f2_1
#a1.sources.r1.kafka.consumer.auto.offset.reset = earliest

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
#a1.sources.r2.groupId=f2_1
#a1.sources.r2.kafka.consumer.auto.offset.reset = earliest

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.7.0-bin/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume-1.7.0-bin/data/behavior1/
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume-1.7.0-bin/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume-1.7.0-bin/data/behavior2/
a1.channels.c2.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-

## 不要产生大量小文件,生产环境rollInterval配置为3600
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件(hdfs未配置lzo压缩,这里全都注释掉)
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k2.hdfs.fileType = CompressedStream

#a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
  • 脚本 flume-f2.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/bin/bash

case $1 in
"start"){
for i in hadoop104
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume-1.7.0-bin/bin/flume-ng agent --conf-file /opt/module/flume-1.7.0-bin/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.7.0-bin/logs/f2_log.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop104
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
done

};;
esac
  • 集群启动/停止脚本cluster.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#! /bin/bash

case $1 in
"start"){
echo " -------- 启动 集群 -------"

echo " -------- 启动 hadoop集群 -------"
/opt/module/hadoop-2.7.2/sbin/start-dfs.sh
ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"

#启动 Zookeeper集群
/opt/module/zookeeper-3.4.10/bin/zk-all.sh start

for (( i=0; i<6; i++ ));do
echo "wait... $i"
sleep 1s;
done

#启动 Flume采集集群
/opt/module/flume-1.7.0-bin/bin/flume-f1.sh start

#启动 Kafka采集集群
/opt/module/kafka_2.12-2.3.1/bin/kafka-all.sh start

sleep 6s;

#启动 Flume消费集群
/opt/module/flume-1.7.0-bin/bin/flume-f2.sh start

};;
"stop"){
echo " -------- 停止 集群 -------"

#停止 Flume消费集群
/opt/module/flume-1.7.0-bin/bin/flume-f2.sh stop

#停止 Kafka采集集群
/opt/module/kafka_2.12-2.3.1/bin/kafka-all.sh stop

sleep 10s;

#停止 Flume采集集群
/opt/module/flume-1.7.0-bin/bin/flume-f1.sh stop

#停止 Zookeeper集群
/opt/module/zookeeper-3.4.10/bin/zk-all.sh stop

echo " -------- 停止 hadoop集群 -------"
ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac
  • 停止集群
1
cluster.sh stop
  • 改系统时间 change_date.sh
1
change_date.sh 2020-03-10
1
2
3
4
5
6
7
#!/bin/bash

for i in hadoop102 hadoop103 hadoop104
do
echo "========== Change Date $i =========="
ssh -t $i "sudo date -s $1"
done
  • 启动集群
1
cluster.sh start
  • 生成日志
1
gen_logs.sh
1
2
3
4
5
6
#!/bin/bash

for i in hadoop102 hadoop103;do
echo "====== generte $i log ======="
ssh $i "java -jar /opt/project/data_warehouse_v2/logcollecter-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 > /dev/null 2>&1 &"
done
  • 查看 hdfs 中的结果

启动后,日志经历 file -> flume -> kafka -> flume -> hdfs,最终会输出到配置的hdfs://origin_data/gmall/log/topic_event/%Y-%m-%d

  • 注意

如果要删掉原来的重新采集的话,1)kafka 注意删除的时候把 topic 删除完全,去 Zookeeper 中看是否还有该 topic 记录;2)flume 要重新消费